package com.cgycms.webloggather.netty;

import com.alibaba.fastjson.JSON;
import com.cgycms.weblogcommon.model.NettyConstant;
import com.cgycms.weblogcommon.model.WebLogMeta;
import com.cgycms.weblogcommon.model.gather.GatherConfig;
import com.cgycms.weblogcommon.util.LocalIpUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName : NettyClient
 * @Description : 客户端实现
 * @Author : cgy
 * @Date: 2020-09-11 16:23
 */
public class NettyClient {

    private Logger logger = LoggerFactory.getLogger(NettyClient.class);

    private Bootstrap bootstrap;

    private EventLoopGroup worker;

    private int port;

    private String url;

    private int MAX_RETRY_TIMES = 10;

    private List<Channel> channels = new ArrayList<>();

    private final AtomicInteger index = new AtomicInteger();

    public NettyClient(GatherConfig config) {
        this.url = config.getServerHost();
        this.port = config.getServerPort();

        //初始化连接
        int channelNumber = 1;
        if (!StringUtils.isEmpty(config.getChannelNumber())) {
            channelNumber = config.getChannelNumber();
        }

        //最大重试次数
        if (!StringUtils.isEmpty(config.getRetryNumber())) {
            this.MAX_RETRY_TIMES = config.getRetryNumber();
        }
        bootstrap = new Bootstrap();
        worker = new NioEventLoopGroup();
        bootstrap.group(worker);
        bootstrap.channel(NioSocketChannel.class);
        createWebSocket(channelNumber);
    }


    @PreDestroy
    public void close() {
        logger.info("关闭websocket资源...");
        worker.shutdownGracefully();
    }


    /**
     * 客户端调用服务端
     *
     * @param webLogMeta
     * @param retry
     */
    public void remoteCall(final WebLogMeta webLogMeta, int retry) {
        try {
            Channel firstActiveChannel = getFirstActiveChannel(0);
            webLogMeta.setIp(LocalIpUtil.getIp());
            firstActiveChannel.writeAndFlush(JSON.toJSONString(webLogMeta));
        } catch (Exception e) {
            retry++;
            if (retry > MAX_RETRY_TIMES) {
                throw new RuntimeException("调用服务端失败并超出重试次数!");
            } else {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                logger.warn("第{}次尝试....失败,msg:{}", retry,e.getMessage());
                remoteCall(webLogMeta, retry);
            }
        }
    }

    /**
     * 创建连接
     *
     * @param count
     */
    public void createWebSocket(int count) {
        logger.info("Initializing WebSocket... number:({})", count);
        try {
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(NettyConstant.getMaxFrameLength(), 0, 2, 0, 2));
                    pipeline.addLast(new LengthFieldPrepender(2));
                    pipeline.addLast(
                            new StringDecoder(CharsetUtil.UTF_8),
                            new StringEncoder(CharsetUtil.UTF_8)
                    );
                    pipeline.addLast(new ClientChannelHandlerAdapter());
                }
            });
            for (int i = 1; i <= count; i++) {
                ChannelFuture sync = bootstrap.connect(url, port).sync();
                channels.add(sync.channel());
            }
        } catch (InterruptedException e) {
            logger.error("初始化连接失败!" + e.getMessage());
        }
    }

    /**
     * 获取一个连接
     *
     * @param count
     * @return
     */
    private Channel getFirstActiveChannel(int count) {
        Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size()));
        if (!channel.isActive()) {
            reconnect(channel);
            if (count >= channels.size()) {
                throw new RuntimeException("无可用的连接!");
            }
            return getFirstActiveChannel(count + 1);
        }
        return channel;
    }

    /**
     * 重试连接
     *
     * @param channel
     */
    private void reconnect(Channel channel) {
        synchronized (channel) {
            if (channels.indexOf(channel) == -1) {
                return;
            }
            Channel newChannel = bootstrap.connect(url, port).channel();
            channels.set(channels.indexOf(channel), newChannel);
        }
    }
}
